Prophet forecaster (wrapper + from-scratch)#
Goals#
Implement a
Prophet(...)forecaster interface that can:wrap the
prophetPython package (Meta/Facebook Prophet) when availablefall back to a lightweight Prophet-like implementation from scratch (NumPy)
Understand Prophet’s core additive structure, changepoints, and Fourier seasonality.
Prerequisites#
Required:
numpy,pandas,plotlyOptional:
prophet(the official implementation)
If you want the wrapper to use the official library:
pip install prophet
Note: the official Prophet stack may download/compile Stan tooling depending on your setup.
Prophet model (high level)#
Prophet is a decomposable additive model:
\(g(t)\): trend (often piecewise linear with changepoints, or logistic growth)
\(s(t)\): seasonality (typically Fourier series for weekly/yearly patterns)
\(h(t)\): holiday / event effects (indicator features)
\(\varepsilon_t\): noise
This notebook implements a piecewise linear trend + Fourier seasonality baseline.
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"
np.set_printoptions(precision=4, suppress=True)
def _as_prophet_df(y: pd.Series | pd.DataFrame, *, y_col: str = "y") -> pd.DataFrame:
"""Coerce input into Prophet-style DataFrame with columns: ds (datetime), y (float)."""
if isinstance(y, pd.Series):
if y.index is None:
raise ValueError("If y is a Series, it must have a DatetimeIndex")
df = pd.DataFrame({"ds": pd.to_datetime(y.index), "y": y.astype(float).to_numpy()})
else:
if "ds" not in y.columns:
raise ValueError("If y is a DataFrame, it must contain a 'ds' column")
if y_col not in y.columns:
raise ValueError(f"If y is a DataFrame, it must contain a '{y_col}' column")
df = y[["ds", y_col]].rename(columns={y_col: "y"}).copy()
df["ds"] = pd.to_datetime(df["ds"])
df["y"] = df["y"].astype(float)
df = df.dropna(subset=["ds", "y"]).sort_values("ds").reset_index(drop=True)
if df.empty:
raise ValueError("No data after dropping NaNs")
return df
def _infer_freq(ds: pd.Series) -> str:
"""Infer a pandas frequency string from a datetime series."""
ds = pd.to_datetime(ds)
inferred = pd.infer_freq(ds)
if inferred is not None:
return inferred
deltas = ds.sort_values().diff().dropna()
if deltas.empty:
return "D"
median = deltas.median()
# Coarse mapping for common cases
if median <= pd.Timedelta(hours=1):
return "H"
if median <= pd.Timedelta(days=1):
return "D"
if median <= pd.Timedelta(days=7):
return "W"
return "D"
def _future_ds(last_ds: pd.Timestamp, steps: int, freq: str) -> pd.DatetimeIndex:
last_ds = pd.to_datetime(last_ds)
if steps < 1:
return pd.DatetimeIndex([])
start = last_ds + pd.tseries.frequencies.to_offset(freq)
return pd.date_range(start=start, periods=int(steps), freq=freq)
def fourier_features(t_days: np.ndarray, period_days: float, fourier_order: int) -> np.ndarray:
"""Fourier series features used by Prophet for seasonality.
Returns shape (n, 2*fourier_order): [sin(2πnt/P), cos(2πnt/P)] for n=1..N.
"""
t_days = np.asarray(t_days, dtype=float).reshape(-1)
if fourier_order <= 0:
return np.zeros((t_days.size, 0), dtype=float)
n = np.arange(1, int(fourier_order) + 1, dtype=float)
x = 2.0 * np.pi * (t_days[:, None] * n[None, :] / float(period_days))
return np.concatenate([np.sin(x), np.cos(x)], axis=1)
def changepoint_features(t_scaled: np.ndarray, changepoints_scaled: np.ndarray) -> np.ndarray:
"""Piecewise-linear basis: max(0, t - s_j) for each changepoint s_j."""
t_scaled = np.asarray(t_scaled, dtype=float).reshape(-1)
changepoints_scaled = np.asarray(changepoints_scaled, dtype=float).reshape(-1)
if changepoints_scaled.size == 0:
return np.zeros((t_scaled.size, 0), dtype=float)
return np.maximum(0.0, t_scaled[:, None] - changepoints_scaled[None, :])
def ridge_fit(X: np.ndarray, y: np.ndarray, l2: np.ndarray) -> np.ndarray:
"""Solve (X^T X + diag(l2)) beta = X^T y."""
X = np.asarray(X, dtype=float)
y = np.asarray(y, dtype=float).reshape(-1)
l2 = np.asarray(l2, dtype=float).reshape(-1)
if l2.size != X.shape[1]:
raise ValueError("l2 must have same length as number of columns in X")
A = X.T @ X + np.diag(l2)
b = X.T @ y
return np.linalg.solve(A, b)
class ProphetLike:
"""A lightweight Prophet-like forecaster (trend + Fourier seasonality).
This is NOT the full Bayesian Stan-based Prophet implementation.
It approximates Prophet's core idea using regularized linear regression.
"""
def __init__(
self,
*,
freq: str | None = None,
n_changepoints: int = 25,
changepoint_range: float = 0.8,
yearly_seasonality: int | bool = 10,
weekly_seasonality: int | bool = 3,
daily_seasonality: int | bool = False,
add_seasonality: list[dict] | None = None,
lambda_changepoints: float = 10.0,
lambda_seasonality: float = 1.0,
):
self.freq = freq
self.n_changepoints = int(n_changepoints)
self.changepoint_range = float(changepoint_range)
self.yearly_seasonality = yearly_seasonality
self.weekly_seasonality = weekly_seasonality
self.daily_seasonality = daily_seasonality
self.add_seasonality = add_seasonality or []
self.lambda_changepoints = float(lambda_changepoints)
self.lambda_seasonality = float(lambda_seasonality)
if not (0 < self.changepoint_range <= 1):
raise ValueError("changepoint_range must be in (0, 1]")
@staticmethod
def _seasonality_order(value: int | bool, default_if_true: int) -> int:
if value is False or value is None:
return 0
if value is True:
return int(default_if_true)
return int(value)
def _build_matrix(self, ds: pd.Series, changepoints_scaled: np.ndarray):
ds = pd.to_datetime(ds)
t_days = (ds - self.ds0_).dt.total_seconds().to_numpy() / 86400.0
t_scaled = t_days / self.t_scale_
# Trend basis: intercept + time + changepoint hinge functions
A = changepoint_features(t_scaled, changepoints_scaled)
X_trend = np.column_stack([np.ones(ds.size), t_scaled, A])
# Seasonality basis
X_season_parts = []
season_names = []
yearly_order = self._seasonality_order(self.yearly_seasonality, default_if_true=10)
weekly_order = self._seasonality_order(self.weekly_seasonality, default_if_true=3)
daily_order = self._seasonality_order(self.daily_seasonality, default_if_true=4)
if yearly_order > 0:
X_season_parts.append(fourier_features(t_days, period_days=365.25, fourier_order=yearly_order))
season_names.append("yearly")
if weekly_order > 0:
X_season_parts.append(fourier_features(t_days, period_days=7.0, fourier_order=weekly_order))
season_names.append("weekly")
if daily_order > 0:
X_season_parts.append(fourier_features(t_days, period_days=1.0, fourier_order=daily_order))
season_names.append("daily")
for spec in self.add_seasonality:
name = spec["name"]
period = float(spec["period"])
order = int(spec["fourier_order"])
X_season_parts.append(fourier_features(t_days, period_days=period, fourier_order=order))
season_names.append(name)
X_season = np.concatenate(X_season_parts, axis=1) if X_season_parts else np.zeros((ds.size, 0))
return t_days, t_scaled, X_trend, X_season, season_names
def fit(self, y: pd.Series | pd.DataFrame):
df = _as_prophet_df(y)
self.freq_ = self.freq or _infer_freq(df["ds"])
self.ds0_ = pd.to_datetime(df["ds"].iloc[0])
t_days = (df["ds"] - self.ds0_).dt.total_seconds().to_numpy() / 86400.0
self.t_scale_ = float(np.max(t_days)) if float(np.max(t_days)) > 0 else 1.0
t_scaled = t_days / self.t_scale_
# Choose changepoints inside the first `changepoint_range` portion of the history
n = df.shape[0]
cp_max = float(np.quantile(t_scaled, self.changepoint_range))
candidate = t_scaled[t_scaled <= cp_max]
cp_count = max(0, min(self.n_changepoints, candidate.size - 2))
if cp_count == 0:
self.changepoints_scaled_ = np.array([], dtype=float)
else:
qs = np.linspace(0, 1, cp_count + 2)[1:-1]
self.changepoints_scaled_ = np.quantile(candidate, qs)
_, _, X_trend, X_season, season_names = self._build_matrix(df["ds"], self.changepoints_scaled_)
self.season_names_ = season_names
self.X_trend_cols_ = X_trend.shape[1]
X = np.concatenate([X_trend, X_season], axis=1)
# Regularization (MAP with Gaussian priors)
l2 = np.zeros(X.shape[1], dtype=float)
# No penalty on intercept and base slope
# Penalty on changepoint hinge terms
if X_trend.shape[1] > 2:
l2[2 : X_trend.shape[1]] = self.lambda_changepoints
# Penalty on seasonality coefficients
if X_season.shape[1] > 0:
l2[X_trend.shape[1] :] = self.lambda_seasonality
self.coef_ = ridge_fit(X, df["y"].to_numpy(), l2)
self.train_ = df
self.fitted_ = self._predict_df(df["ds"], include_components=True)
return self
def _predict_df(self, ds: pd.Series, *, include_components: bool) -> pd.DataFrame:
ds = pd.to_datetime(ds)
t_days, _, X_trend, X_season, _ = self._build_matrix(ds, self.changepoints_scaled_)
X = np.concatenate([X_trend, X_season], axis=1)
yhat = X @ self.coef_
out = pd.DataFrame({"ds": ds, "yhat": yhat})
if include_components:
coef_trend = self.coef_[: X_trend.shape[1]]
coef_season = self.coef_[X_trend.shape[1] :]
out["trend"] = X_trend @ coef_trend
out["seasonality"] = (X_season @ coef_season) if X_season.shape[1] > 0 else 0.0
out["t_days"] = t_days
return out
def predict(self, steps: int = 0, *, ds: pd.Series | None = None) -> pd.DataFrame:
if not hasattr(self, "train_"):
raise RuntimeError("Call fit() first")
if ds is None:
last_ds = pd.to_datetime(self.train_["ds"].iloc[-1])
ds = _future_ds(last_ds, int(steps), self.freq_)
return self._predict_df(pd.Series(ds), include_components=True)
class Prophet:
"""Prophet forecaster interface.
- backend='auto': use `prophet` package if installed, else fallback to ProphetLike
- backend='prophet': require `prophet` package
- backend='from_scratch': force the lightweight ProphetLike implementation
Parameters shown here are intentionally minimal; extend as needed.
"""
def __init__(
self,
*,
freq: str | None = None,
add_seasonality: list[dict] | None = None,
backend: str = "auto",
# common Prophet-ish knobs
n_changepoints: int = 25,
changepoint_range: float = 0.8,
yearly_seasonality: int | bool = 10,
weekly_seasonality: int | bool = 3,
daily_seasonality: int | bool = False,
# regularization for fallback model
lambda_changepoints: float = 10.0,
lambda_seasonality: float = 1.0,
# kwargs passed to official `prophet.Prophet(...)` if available
prophet_kwargs: dict | None = None,
add_country_holidays: str | None = None,
):
self.freq = freq
self.add_seasonality = add_seasonality or []
self.backend = backend
self.n_changepoints = int(n_changepoints)
self.changepoint_range = float(changepoint_range)
self.yearly_seasonality = yearly_seasonality
self.weekly_seasonality = weekly_seasonality
self.daily_seasonality = daily_seasonality
self.lambda_changepoints = float(lambda_changepoints)
self.lambda_seasonality = float(lambda_seasonality)
self.prophet_kwargs = prophet_kwargs or {}
self.add_country_holidays = add_country_holidays
@staticmethod
def _prophet_available() -> bool:
try:
import prophet # noqa: F401
return True
except Exception:
return False
def fit(self, y: pd.Series | pd.DataFrame):
if self.backend not in {"auto", "prophet", "from_scratch"}:
raise ValueError("backend must be one of: auto, prophet, from_scratch")
if self.backend in {"auto", "prophet"} and self._prophet_available():
from prophet import Prophet as ProphetModel
df = _as_prophet_df(y)
self.freq_ = self.freq or _infer_freq(df["ds"])
model = ProphetModel(
yearly_seasonality=self.yearly_seasonality,
weekly_seasonality=self.weekly_seasonality,
daily_seasonality=self.daily_seasonality,
n_changepoints=self.n_changepoints,
changepoint_range=self.changepoint_range,
**self.prophet_kwargs,
)
for spec in self.add_seasonality:
model.add_seasonality(**spec)
if self.add_country_holidays is not None:
model.add_country_holidays(country_name=self.add_country_holidays)
model.fit(df)
self._impl = model
self._train_df = df
self._backend_ = "prophet"
return self
if self.backend == "prophet":
raise ModuleNotFoundError(
"`prophet` is not installed. Install it (e.g. `pip install prophet`) or use backend='from_scratch'."
)
self._impl = ProphetLike(
freq=self.freq,
n_changepoints=self.n_changepoints,
changepoint_range=self.changepoint_range,
yearly_seasonality=self.yearly_seasonality,
weekly_seasonality=self.weekly_seasonality,
daily_seasonality=self.daily_seasonality,
add_seasonality=self.add_seasonality,
lambda_changepoints=self.lambda_changepoints,
lambda_seasonality=self.lambda_seasonality,
).fit(y)
self._backend_ = "from_scratch"
return self
def predict(self, steps: int = 0, *, ds: pd.Series | None = None) -> pd.DataFrame:
if not hasattr(self, "_impl"):
raise RuntimeError("Call fit() first")
if self._backend_ == "prophet":
if ds is None:
last_ds = pd.to_datetime(self._train_df["ds"].iloc[-1])
ds = _future_ds(last_ds, int(steps), self.freq_)
future = pd.DataFrame({"ds": pd.to_datetime(ds)})
fcst = self._impl.predict(future)
keep = [c for c in ["ds", "yhat", "yhat_lower", "yhat_upper"] if c in fcst.columns]
return fcst[keep].copy()
return self._impl.predict(steps=steps, ds=ds)
# Create a synthetic series with: (1) trend + changepoint, (2) weekly + yearly seasonality
rng = np.random.default_rng(0)
ds = pd.date_range("2018-01-01", periods=900, freq="D")
t = np.arange(ds.size, dtype=float)
# Piecewise linear trend
cp = 450
trend = 0.02 * t + 2.0
trend[cp:] += 0.04 * (t[cp:] - cp) # slope increases after changepoint
# Seasonalities
weekly = 1.5 * np.sin(2 * np.pi * t / 7.0)
yearly = 3.0 * np.sin(2 * np.pi * t / 365.25)
noise = rng.normal(0.0, 0.8, size=ds.size)
y = trend + weekly + yearly + noise
y_series = pd.Series(y, index=ds, name="y")
y_series.head()
2018-01-01 2.100584
2018-01-02 3.138668
2018-01-03 4.117924
2018-01-04 2.949499
2018-01-05 1.206905
Freq: D, Name: y, dtype: float64
# Fit our Prophet interface (will fallback to from-scratch in this environment)
model = Prophet(
freq="D",
backend="from_scratch",
n_changepoints=30,
changepoint_range=0.9,
yearly_seasonality=10,
weekly_seasonality=3,
lambda_changepoints=50.0,
lambda_seasonality=1.0,
).fit(y_series)
pred_future = model.predict(steps=120)
pred_future.head()
| ds | yhat | trend | seasonality | t_days | |
|---|---|---|---|---|---|
| 0 | 2020-06-19 | 36.284578 | 36.113908 | 0.170669 | 900.0 |
| 1 | 2020-06-20 | 35.379175 | 36.167341 | -0.788166 | 901.0 |
| 2 | 2020-06-21 | 35.735823 | 36.220774 | -0.484951 | 902.0 |
| 3 | 2020-06-22 | 36.761379 | 36.274206 | 0.487173 | 903.0 |
| 4 | 2020-06-23 | 37.964227 | 36.327639 | 1.636588 | 904.0 |
# Build a combined frame for plotting
fitted = model._impl.fitted_.copy() if getattr(model, "_backend_", None) == "from_scratch" else None
train_df = _as_prophet_df(y_series)
train_df = train_df.rename(columns={"y": "y"})
fig = go.Figure()
fig.add_trace(go.Scatter(x=train_df["ds"], y=train_df["y"], mode="lines", name="observed"))
if fitted is not None:
fig.add_trace(go.Scatter(x=fitted["ds"], y=fitted["yhat"], mode="lines", name="fitted", line=dict(width=2)))
fig.add_trace(go.Scatter(x=pred_future["ds"], y=pred_future["yhat"], mode="lines", name="forecast", line=dict(width=2)))
# Show changepoint locations (from-scratch backend)
if fitted is not None and getattr(model._impl, "changepoints_scaled_", None) is not None:
cp_scaled = model._impl.changepoints_scaled_
if cp_scaled.size > 0:
cp_days = cp_scaled * model._impl.t_scale_
cp_dates = model._impl.ds0_ + pd.to_timedelta(cp_days, unit="D")
for d in cp_dates:
fig.add_vline(x=d, line_width=1, line_dash="dot", line_color="gray", opacity=0.4)
fig.update_layout(
title="Prophet-like forecast (trend + Fourier seasonality)",
xaxis_title="date",
yaxis_title="y",
height=450,
)
fig.show()
/home/tempa/miniconda3/lib/python3.12/site-packages/plotly/io/_json.py:558: UserWarning:
Discarding nonzero nanoseconds in conversion.
# Component view (from-scratch backend)
if fitted is None:
raise RuntimeError("Component plot is implemented for the from_scratch backend in this notebook")
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08, subplot_titles=("Trend", "Seasonality"))
fig.add_trace(go.Scatter(x=fitted["ds"], y=fitted["trend"], mode="lines", name="trend"), row=1, col=1)
fig.add_trace(go.Scatter(x=fitted["ds"], y=fitted["seasonality"], mode="lines", name="seasonality"), row=2, col=1)
fig.update_yaxes(title_text="trend", row=1, col=1)
fig.update_yaxes(title_text="seasonality", row=2, col=1)
fig.update_xaxes(title_text="date", row=2, col=1)
fig.update_layout(height=550, showlegend=False)
fig.show()
Notes (wrapper vs from-scratch)#
The official Prophet model is fitted using a Stan backend (MAP or MCMC) with priors chosen to regularize changepoints and seasonalities.
The from-scratch
ProphetLikehere uses ridge regression on:trend basis \([1, t, \max(0,t-s_1), \ldots, \max(0,t-s_K)]\)
Fourier seasonal basis for weekly/yearly (and any
add_seasonalityspecs)
It is meant for intuition and learning, not as a full drop-in replacement (no holiday effects, no logistic growth, no calibrated uncertainty intervals).